package com.atguigu.day08

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

object $03_UserDefinedSource {

  def main(args: Array[String]): Unit = {

    val ssc = new StreamingContext(new SparkConf().setMaster("local[4]").setAppName("test"),Seconds(5))
    ssc.sparkContext.setLogLevel("error")
    val ds = ssc.receiverStream(new MyReceiver("hadoop102",9999))

    ds.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()

    ssc.start()

    ssc.awaitTermination()
  }
}
